package com.alinesno.cloud.compoment.kafka.config;

import java.nio.charset.Charset;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryContext;

import cn.hutool.setting.dialect.Props;

/**
 * kafka消费者配置
 * 
 * @author WeiXiaoJin
 * @since 2019年4月9日 上午11:20:24
 */
public class KafkaConsumerConfig extends Config {

	private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);

	@Autowired
	private KafkaProperties KafkaProperties;

	@Bean
	public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		factory.setConcurrency(3);
		factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}

	@Bean
	public ConsumerFactory<String, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs());
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Properties properties = Props.getProp("classpath:kafka-consumer.properties", Charset.forName("UTF-8"));
		
		Map<String, Object> props = toMap(properties) ; // KafkaProperties.buildConsumerProperties();

//        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

		log.debug("初始化客户端kafka配置:{}", KafkaProperties);

		return props;

	}


	/**
	 * 手动提交
	 * 
	 * @return
	 */
	@Bean
	public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaManualAckListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

		factory.setConsumerFactory(manualConsumerFactory());
		ContainerProperties props = factory.getContainerProperties();

		props.setAckMode(AckMode.MANUAL_IMMEDIATE);
		props.setIdleEventInterval(100L);
		props.setPollTimeout(1000L);

		factory.setAckDiscarded(true);

		factory.setRecoveryCallback(new RecoveryCallback<Void>() {
			@Override
			public Void recover(RetryContext context) throws Exception {
				return null;
			}
		});

		return factory;
	}

	@Bean
	public ConsumerFactory<String, String> manualConsumerFactory() {
		Map<String, Object> configs = consumerConfigs();
		configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		return new DefaultKafkaConsumerFactory<>(configs);
	}

}